This time you'll find yourself delving into the heart (and other intestines) of recurrent neural networks on a class of toy problems.
Struggle to find a name for the variable? Let's see how you'll come up with a name for your son/daughter. Surely no human has expertize over what is a good child name, so let us train RNN instead;
It's dangerous to go alone, take these:
In [ ]:
import numpy as np
import theano
import theano.tensor as T
import lasagne
import os
#thanks @keskarnitish
In [ ]:
start_token = " "
with open("names") as f:
names = f.read()[:-1].split('\n')
names = [start_token+name for name in names]
In [ ]:
print('n samples = ', len(names))
for x in names[::1000]:
print(x)
In [ ]:
# all unique characters go here
tokens = <all unique characters in the dataset>
tokens = list(tokens)
print('n_tokens = ', len(tokens))
Theano is built for numbers, not strings of characters. We'll feed our recurrent neural network with ids of characters from our dictionary.
To create such dictionary, let's assign each character with it's index in tokens list.
In [ ]:
token_to_id = <YOUR CODE: dictionary of symbol -> its identifier (index in tokens list)>
id_to_token = <dictionary of symbol identifier -> symbol itself>
In [ ]:
import matplotlib.pyplot as plt
%matplotlib inline
plt.hist(list(map(len, names)), bins=25)
In [ ]:
# truncate names longer than MAX_LEN characters.
MAX_LEN = ?!
# you will likely need to change this for any dataset different from "names"
In [ ]:
names_ix = list(map(lambda name: list(map(token_to_id.get, name)), names))
# crop long names and pad short ones
for i in range(len(names_ix)):
names_ix[i] = names_ix[i][:MAX_LEN] # crop too long
if len(names_ix[i]) < MAX_LEN:
names_ix[i] += [token_to_id[" "]] * \
(MAX_LEN - len(names_ix[i])) # pad too short
assert len(set(map(len, names_ix))) == 1
names_ix = np.array(names_ix)
In [ ]:
input_sequence = T.matrix('token sequencea', 'int32')
target_values = T.matrix('actual next token', 'int32')
In [ ]:
from lasagne.layers import InputLayer, DenseLayer, EmbeddingLayer
from lasagne.layers import RecurrentLayer, LSTMLayer, GRULayer, CustomRecurrentLayer
In [ ]:
l_in = lasagne.layers.InputLayer(shape=(None, None), input_var=input_sequence)
#!<Your neural network>
l_emb = <embedding layer or one-hot encoding>
l_rnn = <some recurrent layer(or several such layers)>
# flatten batch and time to be compatible with feedforward layers (will un-flatten later)
l_rnn_flat = lasagne.layers.reshape(l_rnn, (-1, l_rnn.output_shape[-1]))
l_out = <last dense layer(or several layers), returning probabilities for all possible next tokens>
In [ ]:
# Model weights
weights = lasagne.layers.get_all_params(l_out, trainable=True)
print(weights)
In [ ]:
network_output = <NN output via lasagne>
# If you use dropout do not forget to create deterministic version for evaluation
In [ ]:
predicted_probabilities_flat = network_output
correct_answers_flat = target_values.ravel()
loss = <loss function - a simple categorical crossentropy will do, maybe add some regularizer>
updates = <your favorite optimizer>
In [ ]:
# training
train = theano.function([input_sequence, target_values],
loss, updates=updates, allow_input_downcast=True)
# computing loss without training
compute_cost = theano.function(
[input_sequence, target_values], loss, allow_input_downcast=True)
In [ ]:
# compile the function that computes probabilities for next token given previous text.
# reshape back into original shape
next_word_probas = network_output.reshape(
(input_sequence.shape[0], input_sequence.shape[1], len(tokens)))
# predictions for next tokens (after sequence end)
last_word_probas = next_word_probas[:, -1]
probs = theano.function(
[input_sequence], last_word_probas, allow_input_downcast=True)
In [ ]:
def generate_sample(seed_phrase=None, N=MAX_LEN, t=1, n_snippets=1):
'''
The function generates text given a phrase of length at least SEQ_LENGTH.
parameters:
sample_fun - max_ or proportional_sample_fun or whatever else you implemented
The phrase is set using the variable seed_phrase
The optional input "N" is used to set the number of characters of text to predict.
'''
if seed_phrase is None:
seed_phrase = start_token
if len(seed_phrase) > MAX_LEN:
seed_phrase = seed_phrase[-MAX_LEN:]
assert type(seed_phrase) is str
snippets = []
for _ in range(n_snippets):
sample_ix = []
x = list(map(lambda c: token_to_id.get(c, 0), seed_phrase))
x = np.array([x])
for i in range(N):
# Pick the character that got assigned the highest probability
p = probs(x).ravel()
p = p**t / np.sum(p**t)
ix = np.random.choice(np.arange(len(tokens)), p=p)
sample_ix.append(ix)
x = np.hstack((x[-MAX_LEN+1:], [[ix]]))
random_snippet = seed_phrase + \
''.join(id_to_token[ix] for ix in sample_ix)
snippets.append(random_snippet)
print("----\n %s \n----" % '; '.join(snippets))
In [ ]:
def sample_batch(data, batch_size):
rows = data[np.random.randint(0, len(data), size=batch_size)]
return rows[:, :-1], rows[:, 1:]
In [ ]:
print("Training ...")
# total N iterations
n_epochs = 100
# how many minibatches are there in the epoch
batches_per_epoch = 500
# how many training sequences are processed in a single function call
batch_size = 10
for epoch in range(n_epochs):
print("Generated names")
generate_sample(n_snippets=10)
avg_cost = 0
for _ in range(batches_per_epoch):
x, y = sample_batch(names_ix, batch_size)
avg_cost += train(x, y)
print("Epoch {} average loss = {}".format(
epoch, avg_cost / batches_per_epoch))
In [ ]:
In [ ]:
generate_sample(n_snippets=100)
In [ ]:
generate_sample(seed_phrase=" A", n_snippets=10)
In [ ]:
generate_sample(seed_phrase= <whatever you please> , n_snippets=10, t=1.0)
In [ ]:
You've just implemented a recurrent language model that can be tasked with generating any kind of sequence, so there's plenty of data you can try it on:
If you're willing to give it a try, here's what you wanna look at:
Selenium
or Scrapy
for that.Good hunting!